Add msgpack-jackson3 module for Jackson 3.x support#4
Conversation
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces the msgpack-jackson3 module, providing Jackson 3.x support for MessagePack, along with corresponding tests, benchmarks, and documentation. A critical feedback item points out a potential memory leak in MessagePackParser where the messageUnpackerHolder ThreadLocal retains a reference to the input source after the parser is closed, and suggests clearing it to prevent resource leaks.
| if (ownsThreadLocalUnpacker) { | ||
| Tuple<Object, MessageUnpacker> tuple = messageUnpackerHolder.get(); | ||
| if (tuple != null && tuple.first() instanceof byte[]) { | ||
| // close() calls ArrayBufferInput.close() which sets buffer = null, | ||
| // releasing the byte[] payload reference held by the unpacker's input. | ||
| // The unpacker itself is kept alive for reuse on the next parse. | ||
| tuple.second().close(); | ||
| messageUnpackerHolder.set(new Tuple<>(null, tuple.second())); | ||
| } | ||
| } |
There was a problem hiding this comment.
The messageUnpackerHolder ThreadLocal retains a reference to the input source (src) via the Tuple even after the parser is closed, unless the source is a byte[]. If the source is an InputStream, the closed InputStream reference (and any resources it holds) will be leaked on the thread until the next parse operation.
We should clear the reference to the input source on close by setting the first element of the Tuple to null regardless of the source type.
if (ownsThreadLocalUnpacker) {
Tuple<Object, MessageUnpacker> tuple = messageUnpackerHolder.get();
if (tuple != null) {
if (tuple.first() instanceof byte[]) {
// close() calls ArrayBufferInput.close() which sets buffer = null,
// releasing the byte[] payload reference held by the unpacker's input.
// The unpacker itself is kept alive for reuse on the next parse.
tuple.second().close();
}
messageUnpackerHolder.set(new Tuple<>(null, tuple.second()));
}
}There was a problem hiding this comment.
@gemini-code-assist Partially addressed. For byte[] sources and when AUTO_CLOSE_SOURCE is enabled, the source is now cleared on close. For AUTO_CLOSE_SOURCE disabled + InputStream, unconditionally clearing breaks sequential reads: after close the next parse sees cachedSrc (null) != src and calls reset(), which dismisses the unpacker's read-ahead buffer and causes the second sequential read to return wrong data. testInputStreamSequentialReadsWithAutoCloseSourceDisabled in MessagePackParserTest demonstrates this regression.
There was a problem hiding this comment.
Thank you for the clarification, komamitsu. That makes perfect sense. Given that AUTO_CLOSE_SOURCE is disabled, we must preserve the MessageUnpacker state to support sequential reads, and the current logic correctly handles that requirement. I agree that the proposed change would indeed cause a regression in that scenario. Your current implementation is correct for the requirements.
cf4baac to
ddc327e
Compare
There was a problem hiding this comment.
Code Review
This pull request introduces the msgpack-jackson3 module, adding support for the MessagePack format in Jackson 3.x (requiring Java 17+). It includes core components such as the factory, generator, parser, mapper, custom serialization/deserialization for extension types, and JMH benchmarks. The review feedback highlights a potential memory leak in MessagePackParser due to ThreadLocal retention of stream sources, and suggests optimizing TimestampExtensionModule by manually encoding and decoding Instant timestamps to avoid high allocation overhead and GC pressure.
| Tuple<Object, MessageUnpacker> tuple = messageUnpackerHolder.get(); | ||
| if (tuple != null && tuple.first() instanceof byte[]) { | ||
| // close() calls ArrayBufferInput.close() which sets buffer = null, | ||
| // releasing the byte[] payload reference held by the unpacker's input. | ||
| // The unpacker itself is kept alive for reuse on the next parse. | ||
| tuple.second().close(); | ||
| messageUnpackerHolder.set(new Tuple<>(null, tuple.second())); | ||
| } | ||
| } |
There was a problem hiding this comment.
The messageUnpackerHolder ThreadLocal retains a reference to the source object (e.g., ByteArrayInputStream or other stream types) even after the parser is closed. While byte[] sources are explicitly cleared, ByteArrayInputStream and other InputStream sources are not, which can lead to significant memory leaks if they hold large buffers. Consider using a WeakReference for the source object in the Tuple to allow it to be garbage collected when no longer referenced elsewhere, while still supporting sequential reads.
There was a problem hiding this comment.
@gemini-code-assist Declining — we deliberately removed WeakReference. The byte[] source is cleared eagerly on close; InputStream sources with AUTO_CLOSE_SOURCE enabled are also cleared. For AUTO_CLOSE_SOURCE disabled, keeping the strong reference is intentional for same-stream reuse detection.
There was a problem hiding this comment.
Thank you for the clarification, komamitsu. That makes sense regarding the intentional reuse of the InputStream and the AUTO_CLOSE_SOURCE behavior. I appreciate the context on why the strong reference is necessary for your use case.
| @Override | ||
| public void serialize(Instant value, JsonGenerator gen, SerializationContext provider) | ||
| { | ||
| try { | ||
| // Per-call allocation is a known limitation carried from the v2 module. | ||
| // Manually encoding the timestamp bytes would avoid it but duplicates | ||
| // msgpack-core's timestamp logic. Tracked as a future optimization. | ||
| ByteArrayOutputStream os = new ByteArrayOutputStream(); | ||
| try (MessagePacker packer = MessagePack.newDefaultPacker(os)) { | ||
| packer.packTimestamp(value); | ||
| } | ||
| try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(os.toByteArray())) { | ||
| ExtensionTypeHeader header = unpacker.unpackExtensionTypeHeader(); | ||
| byte[] bytes = unpacker.readPayload(header.getLength()); | ||
|
|
||
| MessagePackExtensionType extensionType = new MessagePackExtensionType(EXT_TYPE, bytes); | ||
| gen.writePOJO(extensionType); | ||
| } | ||
| } | ||
| catch (IOException e) { | ||
| throw _wrapIOFailure(provider, e); | ||
| } | ||
| } |
There was a problem hiding this comment.
The InstantSerializer allocates a ByteArrayOutputStream, MessagePacker, and MessageUnpacker on every serialization call. This introduces significant allocation overhead and GC pressure. Since the MessagePack timestamp encoding format is straightforward, we can manually encode the timestamp bytes directly to avoid these allocations entirely.
@Override
public void serialize(Instant value, JsonGenerator gen, SerializationContext provider)
{
try {
long sec = value.getEpochSecond();
int nsec = value.getNano();
byte[] bytes;
if (sec >= 0 && (sec >> 34) == 0) {
if (nsec == 0 && (sec >> 32) == 0) {
bytes = new byte[4];
bytes[0] = (byte) (sec >> 24);
bytes[1] = (byte) (sec >> 16);
bytes[2] = (byte) (sec >> 8);
bytes[3] = (byte) sec;
} else {
bytes = new byte[8];
long val = ((long) nsec << 34) | sec;
bytes[0] = (byte) (val >> 56);
bytes[1] = (byte) (val >> 48);
bytes[2] = (byte) (val >> 40);
bytes[3] = (byte) (val >> 32);
bytes[4] = (byte) (val >> 24);
bytes[5] = (byte) (val >> 16);
bytes[6] = (byte) (val >> 8);
bytes[7] = (byte) val;
}
} else {
bytes = new byte[12];
bytes[0] = (byte) (nsec >> 24);
bytes[1] = (byte) (nsec >> 16);
bytes[2] = (byte) (nsec >> 8);
bytes[3] = (byte) nsec;
bytes[4] = (byte) (sec >> 56);
bytes[5] = (byte) (sec >> 48);
bytes[6] = (byte) (sec >> 40);
bytes[7] = (byte) (sec >> 32);
bytes[8] = (byte) (sec >> 24);
bytes[9] = (byte) (sec >> 16);
bytes[10] = (byte) (sec >> 8);
bytes[11] = (byte) sec;
}
gen.writePOJO(new MessagePackExtensionType(EXT_TYPE, bytes));
}
catch (IOException e) {
throw _wrapIOFailure(provider, e);
}
}There was a problem hiding this comment.
@gemini-code-assist Per-call allocation is a known limitation already noted in a comment in the code. This module is an optional extension, not a hot path. Declining in favour of keeping the implementation simple and readable.
There was a problem hiding this comment.
I understand your perspective. Given that this module is an optional extension and not a hot path, prioritizing simplicity and readability over micro-optimizations is a reasonable design choice. I will leave this as is.
| @Override | ||
| public Instant deserialize(JsonParser p, DeserializationContext ctxt) | ||
| { | ||
| try { | ||
| // Per-call allocation is a known limitation — see serialize() above. | ||
| MessagePackExtensionType ext = p.readValueAs(MessagePackExtensionType.class); | ||
| if (ext.getType() != EXT_TYPE) { | ||
| ctxt.reportInputMismatch(Instant.class, | ||
| "Unexpected extension type (0x%X) for Instant object", ext.getType() & 0xFF); | ||
| return null; // unreachable | ||
| } | ||
| try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(ext.getData())) { | ||
| return unpacker.unpackTimestamp(new ExtensionTypeHeader(EXT_TYPE, ext.getData().length)); | ||
| } | ||
| } | ||
| catch (IOException e) { | ||
| throw _wrapIOFailure(ctxt, e); | ||
| } | ||
| } |
There was a problem hiding this comment.
The InstantDeserializer allocates a MessageUnpacker on every deserialization call to decode the timestamp bytes. We can decode the bytes directly from ext.getData() to avoid this allocation overhead entirely.
@Override
public Instant deserialize(JsonParser p, DeserializationContext ctxt)
{
try {
MessagePackExtensionType ext = p.readValueAs(MessagePackExtensionType.class);
if (ext.getType() != EXT_TYPE) {
ctxt.reportInputMismatch(Instant.class,
"Unexpected extension type (0x%X) for Instant object", ext.getType() & 0xFF);
return null; // unreachable
}
byte[] bytes = ext.getData();
long sec;
int nsec;
if (bytes.length == 4) {
sec = ((long) (bytes[0] & 0xFF) << 24)
| ((long) (bytes[1] & 0xFF) << 16)
| ((long) (bytes[2] & 0xFF) << 8)
| (bytes[3] & 0xFF);
nsec = 0;
} else if (bytes.length == 8) {
long val = ((long) (bytes[0] & 0xFF) << 56)
| ((long) (bytes[1] & 0xFF) << 48)
| ((long) (bytes[2] & 0xFF) << 40)
| ((long) (bytes[3] & 0xFF) << 32)
| ((long) (bytes[4] & 0xFF) << 24)
| ((long) (bytes[5] & 0xFF) << 16)
| ((long) (bytes[6] & 0xFF) << 8)
| (bytes[7] & 0xFF);
nsec = (int) (val >>> 34);
sec = val & 0x3FFFFFFFFL;
} else if (bytes.length == 12) {
nsec = ((bytes[0] & 0xFF) << 24)
| ((bytes[1] & 0xFF) << 16)
| ((bytes[2] & 0xFF) << 8)
| (bytes[3] & 0xFF);
sec = ((long) bytes[4] << 56)
| ((long) (bytes[5] & 0xFF) << 48)
| ((long) (bytes[6] & 0xFF) << 40)
| ((long) (bytes[7] & 0xFF) << 32)
| ((long) (bytes[8] & 0xFF) << 24)
| ((long) (bytes[9] & 0xFF) << 16)
| ((long) (bytes[10] & 0xFF) << 8)
| (bytes[11] & 0xFF);
} else {
throw new IOException("Unexpected extension length for Instant: " + bytes.length);
}
return Instant.ofEpochSecond(sec, nsec);
}
catch (IOException e) {
throw _wrapIOFailure(ctxt, e);
}
}There was a problem hiding this comment.
@gemini-code-assist Same as above — per-call allocation is a known limitation. Declining.
There was a problem hiding this comment.
I understand your perspective regarding the per-call allocation limitation. Given that this is a known design constraint carried over from the Jackson 2.x module for consistency and to avoid duplicating msgpack-core's internal timestamp logic, I will not pursue this further. Thank you for the clarification.
There was a problem hiding this comment.
Pull request overview
Adds a new msgpack-jackson3 module that ports the Jackson integration to Jackson 3.x / Java 17+, plus CI/build wiring, tests, benchmarks, and module-specific documentation.
Changes:
- Adds Jackson 3-compatible MessagePack factory, parser, generator, mapper, serializers, timestamp extension support, and read/write contexts.
- Adds JUnit tests and JMH benchmarks for the new module.
- Updates SBT/CI configuration to include
msgpack-jackson3only on Java 17+.
Reviewed changes
Copilot reviewed 40 out of 41 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
project/plugins.sbt |
Adds the JMH SBT plugin. |
build.sbt |
Adds Java 17-gated msgpack-jackson3 project configuration. |
.github/workflows/CI.yml |
Runs the new module in CI on Java 17+. |
msgpack-jackson3/README.md |
Documents Jackson 3 module usage and migration notes. |
msgpack-jackson3/src/main/java/org/msgpack/jackson/dataformat/* |
Adds Jackson 3 MessagePack runtime implementation. |
msgpack-jackson3/src/test/java/org/msgpack/jackson/dataformat/* |
Adds tests for the new module and regression behavior. |
msgpack-jackson3/src/jmh/java/org/msgpack/jackson/dataformat/benchmark/* |
Adds JMH benchmark harnesses. |
msgpack-jackson3/src/jmh/java/org/msgpack/jackson/dataformat/benchmark/model/* |
Adds benchmark data model classes. |
plans/*.md |
Adds internal notes on pre-existing issues, profiling, and ThreadLocal memory benchmark results. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| super(StreamReadConstraints.defaults(), StreamWriteConstraints.defaults(), | ||
| ErrorReportConfiguration.defaults(), 0, 0); |
There was a problem hiding this comment.
@Copilot These are format-specific read/write feature masks (equivalent to CBORReadFeature.collectDefaults() / CBORWriteFeature.collectDefaults() in CBOR), not StreamReadFeature/StreamWriteFeature bitmasks. The base class initialises StreamReadFeature/StreamWriteFeature from their own collectDefaults() independently of these parameters. AUTO_CLOSE_SOURCE remains enabled by default.
| super(StreamReadConstraints.defaults(), StreamWriteConstraints.defaults(), | ||
| ErrorReportConfiguration.defaults(), 0, 0); |
There was a problem hiding this comment.
@Copilot Same as the note above — the two 0 values are format-specific feature masks, not StreamReadFeature/StreamWriteFeature. Stream feature defaults are set independently by the base class.
… enabled When AUTO_CLOSE_SOURCE is enabled, the stream is closed but the source reference in the ThreadLocal Tuple was never cleared. For AUTO_CLOSE_SOURCE disabled + InputStream, the reference must be kept so the next parse on the same thread can detect same-stream reuse and avoid resetting the unpacker (which would dismiss its read-ahead buffer and corrupt sequential reads). Add testInputStreamSequentialReadsWithAutoCloseSourceDisabled to verify that sequential reads from the same InputStream work correctly.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces the msgpack-jackson3 module, providing Jackson 3.x support for MessagePack on Java 17+. It includes the core factory, parser, generator, and mapper implementations, along with a timestamp extension module, unit tests, and JMH benchmarks. The reviewer feedback focuses on preventing memory leaks in MessagePackParser by wrapping the cached input source in a WeakReference within the ThreadLocal unpacker holder, and resolving potential NullPointerExceptions in both MessagePackParser and TimestampExtensionModule when handling null values.
| { | ||
| // Retained heap per idle thread: ~0.2 KB (MessageUnpacker with cleared input buffer). | ||
| // Negligible compared to Jackson's own per-thread buffer retention. | ||
| private static final ThreadLocal<Tuple<Object, MessageUnpacker>> messageUnpackerHolder = new ThreadLocal<>(); |
There was a problem hiding this comment.
Using a strong reference to the input source (Object) in the ThreadLocal can lead to significant memory leaks in long-running applications (e.g., when threads in a pool are kept alive but the parsed stream/buffer is no longer needed). Wrapping the source object in a WeakReference prevents the ThreadLocal from keeping the source alive after the parsing operation is complete.
| private static final ThreadLocal<Tuple<Object, MessageUnpacker>> messageUnpackerHolder = new ThreadLocal<>(); | |
| private static final ThreadLocal<Tuple<java.lang.ref.WeakReference<Object>, MessageUnpacker>> messageUnpackerHolder = new ThreadLocal<>(); |
There was a problem hiding this comment.
@gemini-code-assist Declining — we deliberately use strong references. For byte[] sources and AUTO_CLOSE_SOURCE enabled, the source is cleared on close. For AUTO_CLOSE_SOURCE disabled, the strong reference is intentional for same-stream reuse detection: clearing it would cause reset() to be called on the next parse, dismissing the unpacker's read-ahead buffer and corrupting sequential reads. Covered by testInputStreamSequentialReadsWithAutoCloseSourceDisabled in MessagePackParserTest.
There was a problem hiding this comment.
Thank you for the clarification, @komamitsu. Your explanation regarding the intentional use of strong references for same-stream reuse detection and the associated test coverage in MessagePackParserTest is clear. Given that this behavior is deliberate and validated, I have no further concerns regarding this implementation.
| Tuple<Object, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get(); | ||
| if (messageUnpackerTuple == null) { | ||
| messageUnpacker = MessagePack.newDefaultUnpacker(input); | ||
| } | ||
| else { | ||
| // Considering to reuse InputStream with StreamReadFeature.AUTO_CLOSE_SOURCE, | ||
| // MessagePackParser needs to use the MessageUnpacker that has the same InputStream | ||
| // since it has buffer which has loaded the InputStream data ahead. | ||
| // However, it needs to call MessageUnpacker#reset when the source is different from the previous one. | ||
| Object cachedSrc = messageUnpackerTuple.first(); | ||
| if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(streamReadFeatures) || cachedSrc != src || src instanceof byte[]) { | ||
| // reset() replaces the internal MessageBufferInput and clears the unpacker's | ||
| // internal read buffer to EMPTY_BUFFER. The old ArrayBufferInput becomes | ||
| // unreachable here (we discard the return value), so its byte[] is GC-eligible. | ||
| messageUnpackerTuple.second().reset(input); | ||
| } | ||
| messageUnpacker = messageUnpackerTuple.second(); | ||
| } | ||
| messageUnpackerHolder.set(new Tuple<>(src, messageUnpacker)); |
There was a problem hiding this comment.
Update the constructor to retrieve and store the input source using a WeakReference to prevent memory leaks.
| Tuple<Object, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get(); | |
| if (messageUnpackerTuple == null) { | |
| messageUnpacker = MessagePack.newDefaultUnpacker(input); | |
| } | |
| else { | |
| // Considering to reuse InputStream with StreamReadFeature.AUTO_CLOSE_SOURCE, | |
| // MessagePackParser needs to use the MessageUnpacker that has the same InputStream | |
| // since it has buffer which has loaded the InputStream data ahead. | |
| // However, it needs to call MessageUnpacker#reset when the source is different from the previous one. | |
| Object cachedSrc = messageUnpackerTuple.first(); | |
| if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(streamReadFeatures) || cachedSrc != src || src instanceof byte[]) { | |
| // reset() replaces the internal MessageBufferInput and clears the unpacker's | |
| // internal read buffer to EMPTY_BUFFER. The old ArrayBufferInput becomes | |
| // unreachable here (we discard the return value), so its byte[] is GC-eligible. | |
| messageUnpackerTuple.second().reset(input); | |
| } | |
| messageUnpacker = messageUnpackerTuple.second(); | |
| } | |
| messageUnpackerHolder.set(new Tuple<>(src, messageUnpacker)); | |
| Tuple<java.lang.ref.WeakReference<Object>, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get(); | |
| if (messageUnpackerTuple == null) { | |
| messageUnpacker = MessagePack.newDefaultUnpacker(input); | |
| } | |
| else { | |
| // Considering to reuse InputStream with StreamReadFeature.AUTO_CLOSE_SOURCE, | |
| // MessagePackParser needs to use the MessageUnpacker that has the same InputStream | |
| // since it has buffer which has loaded the InputStream data ahead. | |
| // However, it needs to call MessageUnpacker#reset when the source is different from the previous one. | |
| java.lang.ref.WeakReference<Object> ref = messageUnpackerTuple.first(); | |
| Object cachedSrc = ref == null ? null : ref.get(); | |
| if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(streamReadFeatures) || cachedSrc != src || src instanceof byte[]) { | |
| // reset() replaces the internal MessageBufferInput and clears the unpacker's | |
| // internal read buffer to EMPTY_BUFFER. The old ArrayBufferInput becomes | |
| // unreachable here (we discard the return value), so its byte[] is GC-eligible. | |
| messageUnpackerTuple.second().reset(input); | |
| } | |
| messageUnpacker = messageUnpackerTuple.second(); | |
| } | |
| messageUnpackerHolder.set(new Tuple<>(new java.lang.ref.WeakReference<>(src), messageUnpacker)); |
There was a problem hiding this comment.
@gemini-code-assist Declining — we deliberately use strong references. For byte[] sources and AUTO_CLOSE_SOURCE enabled, the source is cleared on close. For AUTO_CLOSE_SOURCE disabled, the strong reference is intentional for same-stream reuse detection: clearing it would cause reset() to be called on the next parse, dismissing the unpacker's read-ahead buffer and corrupting sequential reads. Covered by testInputStreamSequentialReadsWithAutoCloseSourceDisabled in MessagePackParserTest.
There was a problem hiding this comment.
Thank you for the clarification, komamitsu. I understand the requirement for strong references to support sequential stream reuse and the specific handling of AUTO_CLOSE_SOURCE for byte[] and InputStream sources. Given that this behavior is verified by testInputStreamSequentialReadsWithAutoCloseSourceDisabled, I agree that the current implementation is correct for your use case.
| Tuple<Object, MessageUnpacker> tuple = messageUnpackerHolder.get(); | ||
| if (tuple != null) { | ||
| if (tuple.first() instanceof byte[]) { | ||
| // close() calls ArrayBufferInput.close() which sets buffer = null, | ||
| // releasing the byte[] payload reference held by the unpacker's input. | ||
| // The unpacker itself is kept alive for reuse on the next parse. | ||
| tuple.second().close(); | ||
| messageUnpackerHolder.set(new Tuple<>(null, tuple.second())); | ||
| } | ||
| else if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(_streamReadFeatures)) { | ||
| // Stream is already closed above; release the reference so it doesn't | ||
| // linger on the thread until the next parse. | ||
| messageUnpackerHolder.set(new Tuple<>(null, tuple.second())); | ||
| } |
There was a problem hiding this comment.
Update _closeInput to handle the WeakReference wrapper when checking and clearing the cached source.
Tuple<java.lang.ref.WeakReference<Object>, MessageUnpacker> tuple = messageUnpackerHolder.get();
if (tuple != null) {
java.lang.ref.WeakReference<Object> ref = tuple.first();
Object cachedSrc = ref == null ? null : ref.get();
if (cachedSrc instanceof byte[]) {
// close() calls ArrayBufferInput.close() which sets buffer = null,
// releasing the byte[] payload reference held by the unpacker's input.
// The unpacker itself is kept alive for reuse on the next parse.
tuple.second().close();
messageUnpackerHolder.set(new Tuple<>(null, tuple.second()));
}
else if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(_streamReadFeatures)) {
// Stream is already closed above; release the reference so it doesn't
// linger on the thread until the next parse.
messageUnpackerHolder.set(new Tuple<>(null, tuple.second()));
}
}There was a problem hiding this comment.
@gemini-code-assist Declining — we deliberately use strong references. For byte[] sources and AUTO_CLOSE_SOURCE enabled, the source is cleared on close. For AUTO_CLOSE_SOURCE disabled, the strong reference is intentional for same-stream reuse detection: clearing it would cause reset() to be called on the next parse, dismissing the unpacker's read-ahead buffer and corrupting sequential reads. Covered by testInputStreamSequentialReadsWithAutoCloseSourceDisabled in MessagePackParserTest.
There was a problem hiding this comment.
Thank you for the clarification, komamitsu. I understand the requirement for strong references to support sequential stream reuse when AUTO_CLOSE_SOURCE is disabled. Your explanation regarding the testInputStreamSequentialReadsWithAutoCloseSourceDisabled test case confirms that this behavior is intentional and necessary for the unpacker's read-ahead buffer. I have no further concerns regarding this implementation.
| if (isObjectValueSet) { | ||
| streamReadContext.setCurrentName(deserializedExtensionTypeValue().toString()); | ||
| nextToken = JsonToken.PROPERTY_NAME; | ||
| } |
There was a problem hiding this comment.
If a custom extension deserializer returns null, calling .toString() on it will throw a NullPointerException. Add a null check to handle this case safely.
if (isObjectValueSet) {
Object deserialized = deserializedExtensionTypeValue();
streamReadContext.setCurrentName(deserialized == null ? null : deserialized.toString());
nextToken = JsonToken.PROPERTY_NAME;
}There was a problem hiding this comment.
@gemini-code-assist Declining — a custom deserializer returning null for use as a map key or string value is caller misuse. We don't defensively guard against every broken user-provided deserializer implementation.
There was a problem hiding this comment.
I understand your perspective. Given that custom deserializers are expected to return valid objects, enforcing strict contracts on user-provided implementations is a reasonable design choice to avoid unnecessary overhead. I will keep this in mind for future reviews.
| case EXT: | ||
| try { | ||
| return deserializedExtensionTypeValue().toString(); | ||
| } |
There was a problem hiding this comment.
There was a problem hiding this comment.
@gemini-code-assist Declining — a custom deserializer returning null for use as a map key or string value is caller misuse. We don't defensively guard against every broken user-provided deserializer implementation.
There was a problem hiding this comment.
I understand your perspective. My intention was to suggest a defensive programming practice, but I respect your design decision to rely on the caller to provide valid deserializers. I will keep this in mind for future reviews.
| MessagePackExtensionType ext = p.readValueAs(MessagePackExtensionType.class); | ||
| if (ext.getType() != EXT_TYPE) { |
There was a problem hiding this comment.
If p.readValueAs returns null (e.g., if the input value is null), calling ext.getType() will throw a NullPointerException. Add a null check to prevent this.
| MessagePackExtensionType ext = p.readValueAs(MessagePackExtensionType.class); | |
| if (ext.getType() != EXT_TYPE) { | |
| MessagePackExtensionType ext = p.readValueAs(MessagePackExtensionType.class); | |
| if (ext == null) { | |
| return null; | |
| } | |
| if (ext.getType() != EXT_TYPE) { |
There was a problem hiding this comment.
@gemini-code-assist Declining — a custom deserializer returning null for use as a map key or string value is caller misuse. We don't defensively guard against every broken user-provided deserializer implementation.
There was a problem hiding this comment.
I understand your perspective regarding defensive programming in custom deserializers. Given that this is a library intended for use by other developers, keeping the implementation focused on its primary responsibility while assuming correct usage by the caller is a valid design choice. I will defer to your judgment on this.
| if (currentParentElementIndex == 0) { | ||
| isElementsClosed = true; | ||
| currentParentElementIndex = parent.parentIndex; | ||
| currentState = IN_ROOT; | ||
| return; | ||
| } | ||
|
|
||
| currentParentElementIndex = parent.parentIndex; | ||
| assert currentParentElementIndex >= 0; | ||
| Node currentParent = nodes.get(currentParentElementIndex); |
There was a problem hiding this comment.
@Copilot Fixed — endCurrentContainer() now uses parent.parentIndex == -1 to detect root containers instead of currentParentElementIndex == 0, which only worked for the first root. Added testMultipleRootContainersWithoutFlush in MessagePackGeneratorTest to cover this.
| public class PackageVersion | ||
| implements Versioned | ||
| { | ||
| public static final Version VERSION = new Version(0, 9, 12, null, "org.msgpack", "jackson-dataformat-msgpack-jackson3"); |
There was a problem hiding this comment.
@Copilot Acknowledged — this is a known limitation tracked in our fix plan. The equivalent of Jackson's maven-replacer-plugin + .java.in template approach for sbt would be sbt-buildinfo, and will be addressed before publication.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request adds the msgpack-jackson3 module to support MessagePack in Jackson 3.x, including core parser, generator, and mapper implementations migrated to the tools.jackson package, alongside timestamp extension support, benchmarks, and tests. The review feedback identifies critical memory leak risks in MessagePackParser due to caching strong references to source streams in a ThreadLocal, suggesting wrapping them in WeakReference. Additionally, the reviewer recommends adding defensive null checks in MessagePackParser and TimestampExtensionModule to prevent potential NullPointerException crashes when deserialized values are null.
| { | ||
| // Retained heap per idle thread: ~0.2 KB (MessageUnpacker with cleared input buffer). | ||
| // Negligible compared to Jackson's own per-thread buffer retention. | ||
| private static final ThreadLocal<Tuple<Object, MessageUnpacker>> messageUnpackerHolder = new ThreadLocal<>(); |
There was a problem hiding this comment.
Using a strong reference to the source object (like InputStream) in a ThreadLocal can lead to severe memory leaks if AUTO_CLOSE_SOURCE is disabled, as the stream and its associated resources will linger on the thread indefinitely. Wrapping the cached source in a WeakReference allows the garbage collector to reclaim the stream when it is no longer referenced elsewhere, while still preserving the same-stream reuse optimization for sequential reads.
| private static final ThreadLocal<Tuple<Object, MessageUnpacker>> messageUnpackerHolder = new ThreadLocal<>(); | |
| private static final ThreadLocal<Tuple<java.lang.ref.WeakReference<Object>, MessageUnpacker>> messageUnpackerHolder = new ThreadLocal<>(); |
There was a problem hiding this comment.
@gemini-code-assist Same as the note above — strong references are intentional. See replies on the earlier review for the full rationale.
There was a problem hiding this comment.
I understand your rationale for using strong references in the ThreadLocal for performance reasons, particularly given the negligible memory footprint compared to Jackson's internal buffers. Since this is an intentional design choice for the msgpack-jackson3 module, I will withdraw my suggestion. Thank you for the clarification.
| Tuple<Object, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get(); | ||
| if (messageUnpackerTuple == null) { | ||
| messageUnpacker = MessagePack.newDefaultUnpacker(input); | ||
| } | ||
| else { | ||
| // Considering to reuse InputStream with StreamReadFeature.AUTO_CLOSE_SOURCE, | ||
| // MessagePackParser needs to use the MessageUnpacker that has the same InputStream | ||
| // since it has buffer which has loaded the InputStream data ahead. | ||
| // However, it needs to call MessageUnpacker#reset when the source is different from the previous one. | ||
| Object cachedSrc = messageUnpackerTuple.first(); | ||
| if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(streamReadFeatures) || cachedSrc != src || src instanceof byte[]) { | ||
| // reset() replaces the internal MessageBufferInput and clears the unpacker's | ||
| // internal read buffer to EMPTY_BUFFER. The old ArrayBufferInput becomes | ||
| // unreachable here (we discard the return value), so its byte[] is GC-eligible. | ||
| messageUnpackerTuple.second().reset(input); | ||
| } | ||
| messageUnpacker = messageUnpackerTuple.second(); | ||
| } | ||
| messageUnpackerHolder.set(new Tuple<>(src, messageUnpacker)); |
There was a problem hiding this comment.
Update the constructor to retrieve and store the cached source using WeakReference to prevent memory leaks.
| Tuple<Object, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get(); | |
| if (messageUnpackerTuple == null) { | |
| messageUnpacker = MessagePack.newDefaultUnpacker(input); | |
| } | |
| else { | |
| // Considering to reuse InputStream with StreamReadFeature.AUTO_CLOSE_SOURCE, | |
| // MessagePackParser needs to use the MessageUnpacker that has the same InputStream | |
| // since it has buffer which has loaded the InputStream data ahead. | |
| // However, it needs to call MessageUnpacker#reset when the source is different from the previous one. | |
| Object cachedSrc = messageUnpackerTuple.first(); | |
| if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(streamReadFeatures) || cachedSrc != src || src instanceof byte[]) { | |
| // reset() replaces the internal MessageBufferInput and clears the unpacker's | |
| // internal read buffer to EMPTY_BUFFER. The old ArrayBufferInput becomes | |
| // unreachable here (we discard the return value), so its byte[] is GC-eligible. | |
| messageUnpackerTuple.second().reset(input); | |
| } | |
| messageUnpacker = messageUnpackerTuple.second(); | |
| } | |
| messageUnpackerHolder.set(new Tuple<>(src, messageUnpacker)); | |
| Tuple<java.lang.ref.WeakReference<Object>, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get(); | |
| if (messageUnpackerTuple == null) { | |
| messageUnpacker = MessagePack.newDefaultUnpacker(input); | |
| } | |
| else { | |
| // Considering to reuse InputStream with StreamReadFeature.AUTO_CLOSE_SOURCE, | |
| // MessagePackParser needs to use the MessageUnpacker that has the same InputStream | |
| // since it has buffer which has loaded the InputStream data ahead. | |
| // However, it needs to call MessageUnpacker#reset when the source is different from the previous one. | |
| Object cachedSrc = messageUnpackerTuple.first().get(); | |
| if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(streamReadFeatures) || cachedSrc != src || src instanceof byte[]) { | |
| // reset() replaces the internal MessageBufferInput and clears the unpacker's | |
| // internal read buffer to EMPTY_BUFFER. The old ArrayBufferInput becomes | |
| // unreachable here (we discard the return value), so its byte[] is GC-eligible. | |
| messageUnpackerTuple.second().reset(input); | |
| } | |
| messageUnpacker = messageUnpackerTuple.second(); | |
| } | |
| messageUnpackerHolder.set(new Tuple<>(new java.lang.ref.WeakReference<>(src), messageUnpacker)); |
There was a problem hiding this comment.
@gemini-code-assist Same as the note above — strong references are intentional. See replies on the earlier review for the full rationale.
There was a problem hiding this comment.
I understand. Since the strong references are intentional for performance or design reasons in this specific context, I will respect that decision. Thank you for clarifying the rationale.
| if (ownsThreadLocalUnpacker) { | ||
| Tuple<Object, MessageUnpacker> tuple = messageUnpackerHolder.get(); | ||
| if (tuple != null) { | ||
| if (tuple.first() instanceof byte[]) { | ||
| // close() calls ArrayBufferInput.close() which sets buffer = null, | ||
| // releasing the byte[] payload reference held by the unpacker's input. | ||
| // The unpacker itself is kept alive for reuse on the next parse. | ||
| tuple.second().close(); | ||
| messageUnpackerHolder.set(new Tuple<>(null, tuple.second())); | ||
| } | ||
| else if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(_streamReadFeatures)) { | ||
| // Stream is already closed above; release the reference so it doesn't | ||
| // linger on the thread until the next parse. | ||
| messageUnpackerHolder.set(new Tuple<>(null, tuple.second())); | ||
| } | ||
| // else: InputStream with AUTO_CLOSE_SOURCE disabled — keep the reference | ||
| // so the next parse on the same thread can detect same-stream reuse and | ||
| // avoid resetting the unpacker (which would discard its read-ahead buffer). | ||
| } | ||
| } |
There was a problem hiding this comment.
Update _closeInput() to safely retrieve the cached source from the WeakReference and clear it appropriately to prevent resource leaks.
if (ownsThreadLocalUnpacker) {
Tuple<java.lang.ref.WeakReference<Object>, MessageUnpacker> tuple = messageUnpackerHolder.get();
if (tuple != null) {
Object cachedSrc = tuple.first().get();
if (cachedSrc instanceof byte[]) {
// close() calls ArrayBufferInput.close() which sets buffer = null,
// releasing the byte[] payload reference held by the unpacker's input.
// The unpacker itself is kept alive for reuse on the next parse.
tuple.second().close();
messageUnpackerHolder.set(new Tuple<>(new java.lang.ref.WeakReference<>(null), tuple.second()));
}
else if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(_streamReadFeatures) || cachedSrc == null) {
// Stream is already closed above; release the reference so it doesn't
// linger on the thread until the next parse.
messageUnpackerHolder.set(new Tuple<>(new java.lang.ref.WeakReference<>(null), tuple.second()));
}
// else: InputStream with AUTO_CLOSE_SOURCE disabled — keep the reference
// so the next parse on the same thread can detect same-stream reuse and
// avoid resetting the unpacker (which would discard its read-ahead buffer).
}
}There was a problem hiding this comment.
@gemini-code-assist Same as the note above — strong references are intentional. See replies on the earlier review for the full rationale.
There was a problem hiding this comment.
I understand your point regarding the intentional use of strong references to maintain the unpacker state across parses. Given that the per-thread overhead is minimal and well-understood, I will defer to your design choice. Thank you for the clarification.
| if (isObjectValueSet) { | ||
| streamReadContext.setCurrentName(deserializedExtensionTypeValue().toString()); | ||
| nextToken = JsonToken.PROPERTY_NAME; | ||
| } |
There was a problem hiding this comment.
If a custom extension deserializer is registered and returns null, calling .toString() on the result will throw a NullPointerException. Adding a null check prevents this crash.
if (isObjectValueSet) {
Object deserialized = deserializedExtensionTypeValue();
streamReadContext.setCurrentName(deserialized == null ? null : deserialized.toString());
nextToken = JsonToken.PROPERTY_NAME;
}There was a problem hiding this comment.
@gemini-code-assist Declining — same reasoning as the earlier review: caller misuse, not a case we need to defensively guard against.
There was a problem hiding this comment.
I understand your perspective. Since this is intended to be a library where the caller is responsible for the integrity of their custom deserializers, I will respect your decision to decline the defensive check. Thank you for the clarification.
| try { | ||
| return deserializedExtensionTypeValue().toString(); | ||
| } | ||
| catch (IOException e) { | ||
| throw _wrapIOFailure(e); | ||
| } |
There was a problem hiding this comment.
If deserializedExtensionTypeValue() returns null, calling .toString() on it will throw a NullPointerException. Adding a null check ensures safe handling.
| try { | |
| return deserializedExtensionTypeValue().toString(); | |
| } | |
| catch (IOException e) { | |
| throw _wrapIOFailure(e); | |
| } | |
| try { | |
| Object deserialized = deserializedExtensionTypeValue(); | |
| return deserialized == null ? "null" : deserialized.toString(); | |
| } | |
| catch (IOException e) { | |
| throw _wrapIOFailure(e); | |
| } |
There was a problem hiding this comment.
@gemini-code-assist Declining — same reasoning as the earlier review: caller misuse, not a case we need to defensively guard against.
There was a problem hiding this comment.
I understand your perspective. Since this is a library, you are prioritizing performance and avoiding unnecessary overhead for the caller. I will respect your decision to decline this defensive check.
| MessagePackExtensionType ext = p.readValueAs(MessagePackExtensionType.class); | ||
| if (ext.getType() != EXT_TYPE) { | ||
| ctxt.reportInputMismatch(Instant.class, | ||
| "Unexpected extension type (0x%X) for Instant object", ext.getType() & 0xFF); | ||
| return null; // unreachable | ||
| } |
There was a problem hiding this comment.
p.readValueAs(MessagePackExtensionType.class) can return null if the input is null or invalid. Adding a null check on ext prevents a potential NullPointerException when calling ext.getType().
MessagePackExtensionType ext = p.readValueAs(MessagePackExtensionType.class);
if (ext == null) {
ctxt.reportInputMismatch(Instant.class, "Unexpected null value for Instant object");
return null;
}
if (ext.getType() != EXT_TYPE) {
ctxt.reportInputMismatch(Instant.class,
"Unexpected extension type (0x%X) for Instant object", ext.getType() & 0xFF);
return null; // unreachable
}There was a problem hiding this comment.
@gemini-code-assist Declining — same reasoning as the earlier review: caller misuse, not a case we need to defensively guard against.
There was a problem hiding this comment.
I understand your perspective regarding defensive programming in this context. Given that this is a library where the caller is expected to provide valid inputs and handle Jackson's DeserializationContext appropriately, I will defer to your design choice. I have no further concerns on this thread.
…ckGenerator endCurrentContainer() used currentParentElementIndex == 0 to detect a root container, which fails for any root container beyond the first. The second root node sits at index 1, so the check fell through to the nested-container path and called nodes.get(-1), causing an AssertionError/IndexOutOfBoundsException. Fix: use parent.parentIndex == -1 to detect root containers regardless of their position in the node list. Add testMultipleRootContainersWithoutFlush to cover consecutive root writes without an intervening flush().
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces the msgpack-jackson3 module, which adds support for Jackson 3.x and Java 17+ to the MessagePack library. It includes implementations of the core Jackson streaming and databind components, comprehensive unit tests, and JMH benchmarks. The review feedback highlights potential NullPointerException risks in MessagePackParser and MessagePackSerializedString when handling null values or null deserialization results, suggesting defensive null checks to ensure stability.
| if (isObjectValueSet) { | ||
| streamReadContext.setCurrentName(deserializedExtensionTypeValue().toString()); | ||
| nextToken = JsonToken.PROPERTY_NAME; | ||
| } |
There was a problem hiding this comment.
If deserializedExtensionTypeValue() returns null (e.g., due to a custom deserializer returning null), calling .toString() directly on it will throw a NullPointerException. A defensive null check should be added here.
if (isObjectValueSet) {
Object deserialized = deserializedExtensionTypeValue();
streamReadContext.setCurrentName(deserialized == null ? null : deserialized.toString());
nextToken = JsonToken.PROPERTY_NAME;
}There was a problem hiding this comment.
@gemini-code-assist Declining — a custom deserializer returning null for an extension type used as a map key or string value is caller misuse. We don't defensively guard against every broken user-provided deserializer.
There was a problem hiding this comment.
I understand your perspective. Given that the library's design philosophy assumes the caller provides valid deserializers, I will respect that decision. I have no further concerns regarding this thread.
| try { | ||
| return deserializedExtensionTypeValue().toString(); | ||
| } |
There was a problem hiding this comment.
There was a problem hiding this comment.
@gemini-code-assist Same as above — declining.
There was a problem hiding this comment.
I understand your perspective, komamitsu. Given that deserializedExtensionTypeValue() is intended to return a non-null object in this context, the defensive check may indeed be unnecessary if the underlying implementation guarantees it. I will decline the suggestion.
| @Override | ||
| public String getValue() | ||
| { | ||
| return value.toString(); | ||
| } |
There was a problem hiding this comment.
If value is null (which can happen when serializing a map with a null key), calling value.toString() will throw a NullPointerException. Handling null safely by returning "null" prevents this crash and allows the generator to correctly serialize it as a nil map key.
| @Override | |
| public String getValue() | |
| { | |
| return value.toString(); | |
| } | |
| @Override | |
| public String getValue() | |
| { | |
| return value == null ? "null" : value.toString(); | |
| } |
There was a problem hiding this comment.
@gemini-code-assist Fixed — getValue() now returns null when value is null instead of NPE-ing. The null propagates through writeName(SerializableString) → writeName(String null) → addKeyNode(null) → packNonContainer(null) which correctly packs msgpack nil. Note: returning the string "null" as suggested would be semantically wrong — it would serialize the key as the string "null" rather than msgpack nil.
There was a problem hiding this comment.
Thank you for the clarification, komamitsu. You are absolutely correct that returning the string "null" would change the semantic meaning to a literal string "null" rather than a MessagePack nil. Your implementation correctly propagates the null value to the underlying MessagePack packer, which is the desired behavior for nil map keys.
| default: | ||
| packNonContainer(value); | ||
| flushMessagePacker(); | ||
| break; |
There was a problem hiding this comment.
@Copilot Fixed — addValueNode() now flushes any buffered-but-closed root container before packing a root-level scalar, preserving the correct write order. Added testRootScalarAfterClosedRootContainerPreservesOrder in MessagePackGeneratorTest to cover this.
| if (doubleValue < Long.MIN_VALUE || doubleValue > Long.MAX_VALUE) { | ||
| return _reportError("Numeric value (" + doubleValue + ") out of range for `long`"); | ||
| } |
There was a problem hiding this comment.
@Copilot Fixed — changed doubleValue > Long.MAX_VALUE to doubleValue >= (double) Long.MAX_VALUE. Since (double) Long.MAX_VALUE rounds up to exactly 2^63, the original > let the value 2^63 through and silently cast it to Long.MAX_VALUE. Added testGetLongValueFromDoubleThatEqualsLongMaxValueRoundedUpThrows to cover the regression.
- MessagePackParser.getLongValue(): double upper-bound check used > instead of >=. (double)Long.MAX_VALUE rounds up to 2^63, so a double value of exactly 2^63 passed the check and was silently cast to Long.MAX_VALUE. Fix: use >= (double)Long.MAX_VALUE. - MessagePackGenerator.addValueNode(): root scalars written after a closed-but-unflushed root container were packed immediately, reversing the serialized order. Fix: flush any pending root before packing. - MessagePackSerializedString.getValue(): returned value.toString() unconditionally, NPE-ing when value is null (e.g. null map key via streaming API). Fix: return null when value is null. Add tests for all three: testGetLongValueFromDoubleThatEqualsLongMaxValueRoundedUpThrows, testRootScalarAfterClosedRootContainerPreservesOrder, testNullSerializedStringKeyDoesNotThrowNpe.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces the msgpack-jackson3 module, adding support for Jackson 3.x and Java 17+ to MessagePack. The changes include migrating core components like MessagePackFactory, MessagePackGenerator, and MessagePackParser to the new tools.jackson namespace, along with adding benchmarks, tests, and a TimestampExtensionModule for Instant serialization. The review feedback highlights several critical improvement opportunities: addressing a potential memory leak in MessagePackParser by using WeakReference in the ThreadLocal cache, making the Java 17+ version check in build.sbt more robust for early-access releases, adding a defensive null check to prevent a potential NullPointerException in MessagePackParser.currentName(), and optimizing Instant serialization/deserialization by manually encoding/decoding timestamp bytes to eliminate heavy allocations.
| { | ||
| // Retained heap per idle thread: ~0.2 KB (MessageUnpacker with cleared input buffer). | ||
| // Negligible compared to Jackson's own per-thread buffer retention. | ||
| private static final ThreadLocal<Tuple<Object, MessageUnpacker>> messageUnpackerHolder = new ThreadLocal<>(); |
There was a problem hiding this comment.
Holding a strong reference to the input source (such as an InputStream) in a static final ThreadLocal can lead to severe memory leaks in long-running applications or thread-pooled environments (e.g., web servers), as the stream and its associated resources cannot be garbage collected even after the parser is closed.
Using a WeakReference for the cached source prevents this memory leak while still allowing same-stream reuse detection if the stream is kept alive by the calling application.
| private static final ThreadLocal<Tuple<Object, MessageUnpacker>> messageUnpackerHolder = new ThreadLocal<>(); | |
| private static final ThreadLocal<Tuple<java.lang.ref.WeakReference<Object>, MessageUnpacker>> messageUnpackerHolder = new ThreadLocal<>(); |
There was a problem hiding this comment.
@gemini-code-assist Declining — strong references are deliberate. For byte[] sources and when AUTO_CLOSE_SOURCE is enabled, the source is cleared on close. For AUTO_CLOSE_SOURCE disabled, keeping the strong reference is intentional for same-stream reuse detection — clearing it would cause reset() to discard the unpacker's read-ahead buffer and corrupt sequential reads. See testInputStreamSequentialReadsWithAutoCloseSourceDisabled in MessagePackParserTest.
There was a problem hiding this comment.
Thank you for the clarification, komamitsu. I understand that the strong reference is intentional to support same-stream reuse detection when AUTO_CLOSE_SOURCE is disabled, and that you have verified this behavior with testInputStreamSequentialReadsWithAutoCloseSourceDisabled. Given that this is a deliberate design choice for performance and functionality in specific use cases, I will withdraw my suggestion.
| Tuple<Object, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get(); | ||
| if (messageUnpackerTuple == null) { | ||
| messageUnpacker = MessagePack.newDefaultUnpacker(input); | ||
| } | ||
| else { | ||
| // Considering to reuse InputStream with StreamReadFeature.AUTO_CLOSE_SOURCE, | ||
| // MessagePackParser needs to use the MessageUnpacker that has the same InputStream | ||
| // since it has buffer which has loaded the InputStream data ahead. | ||
| // However, it needs to call MessageUnpacker#reset when the source is different from the previous one. | ||
| Object cachedSrc = messageUnpackerTuple.first(); | ||
| if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(streamReadFeatures) || cachedSrc != src || src instanceof byte[]) { | ||
| // reset() replaces the internal MessageBufferInput and clears the unpacker's | ||
| // internal read buffer to EMPTY_BUFFER. The old ArrayBufferInput becomes | ||
| // unreachable here (we discard the return value), so its byte[] is GC-eligible. | ||
| messageUnpackerTuple.second().reset(input); | ||
| } | ||
| messageUnpacker = messageUnpackerTuple.second(); | ||
| } | ||
| messageUnpackerHolder.set(new Tuple<>(src, messageUnpacker)); |
There was a problem hiding this comment.
Update the constructor to retrieve and store the cached source using WeakReference to align with the thread-local memory leak fix.
| Tuple<Object, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get(); | |
| if (messageUnpackerTuple == null) { | |
| messageUnpacker = MessagePack.newDefaultUnpacker(input); | |
| } | |
| else { | |
| // Considering to reuse InputStream with StreamReadFeature.AUTO_CLOSE_SOURCE, | |
| // MessagePackParser needs to use the MessageUnpacker that has the same InputStream | |
| // since it has buffer which has loaded the InputStream data ahead. | |
| // However, it needs to call MessageUnpacker#reset when the source is different from the previous one. | |
| Object cachedSrc = messageUnpackerTuple.first(); | |
| if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(streamReadFeatures) || cachedSrc != src || src instanceof byte[]) { | |
| // reset() replaces the internal MessageBufferInput and clears the unpacker's | |
| // internal read buffer to EMPTY_BUFFER. The old ArrayBufferInput becomes | |
| // unreachable here (we discard the return value), so its byte[] is GC-eligible. | |
| messageUnpackerTuple.second().reset(input); | |
| } | |
| messageUnpacker = messageUnpackerTuple.second(); | |
| } | |
| messageUnpackerHolder.set(new Tuple<>(src, messageUnpacker)); | |
| Tuple<java.lang.ref.WeakReference<Object>, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get(); | |
| if (messageUnpackerTuple == null) { | |
| messageUnpacker = MessagePack.newDefaultUnpacker(input); | |
| } | |
| else { | |
| // Considering to reuse InputStream with StreamReadFeature.AUTO_CLOSE_SOURCE, | |
| // MessagePackParser needs to use the MessageUnpacker that has the same InputStream | |
| // since it has buffer which has loaded the InputStream data ahead. | |
| // However, it needs to call MessageUnpacker#reset when the source is different from the previous one. | |
| Object cachedSrc = messageUnpackerTuple.first().get(); | |
| if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(streamReadFeatures) || cachedSrc != src || src instanceof byte[]) { | |
| // reset() replaces the internal MessageBufferInput and clears the unpacker's | |
| // internal read buffer to EMPTY_BUFFER. The old ArrayBufferInput becomes | |
| // unreachable here (we discard the return value), so its byte[] is GC-eligible. | |
| messageUnpackerTuple.second().reset(input); | |
| } | |
| messageUnpacker = messageUnpackerTuple.second(); | |
| } | |
| messageUnpackerHolder.set(new Tuple<>(new java.lang.ref.WeakReference<>(src), messageUnpacker)); |
There was a problem hiding this comment.
@gemini-code-assist Declining — strong references are deliberate. For byte[] sources and when AUTO_CLOSE_SOURCE is enabled, the source is cleared on close. For AUTO_CLOSE_SOURCE disabled, keeping the strong reference is intentional for same-stream reuse detection — clearing it would cause reset() to discard the unpacker's read-ahead buffer and corrupt sequential reads. See testInputStreamSequentialReadsWithAutoCloseSourceDisabled in MessagePackParserTest.
There was a problem hiding this comment.
Thank you for the clarification, komamitsu. I understand that the strong references are intentional to support sequential stream reuse when AUTO_CLOSE_SOURCE is disabled, and that clearing them would indeed break that functionality. Given the context provided, your implementation is correct for this use case.
| Tuple<Object, MessageUnpacker> tuple = messageUnpackerHolder.get(); | ||
| if (tuple != null) { | ||
| if (tuple.first() instanceof byte[]) { | ||
| // close() calls ArrayBufferInput.close() which sets buffer = null, | ||
| // releasing the byte[] payload reference held by the unpacker's input. | ||
| // The unpacker itself is kept alive for reuse on the next parse. | ||
| tuple.second().close(); | ||
| messageUnpackerHolder.set(new Tuple<>(null, tuple.second())); | ||
| } | ||
| else if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(_streamReadFeatures)) { | ||
| // Stream is already closed above; release the reference so it doesn't | ||
| // linger on the thread until the next parse. | ||
| messageUnpackerHolder.set(new Tuple<>(null, tuple.second())); | ||
| } |
There was a problem hiding this comment.
Update _closeInput() to retrieve and clear the cached source using WeakReference to align with the thread-local memory leak fix.
Tuple<java.lang.ref.WeakReference<Object>, MessageUnpacker> tuple = messageUnpackerHolder.get();
if (tuple != null) {
Object cachedSrc = tuple.first().get();
if (cachedSrc instanceof byte[]) {
// close() calls ArrayBufferInput.close() which sets buffer = null,
// releasing the byte[] payload reference held by the unpacker's input.
// The unpacker itself is kept alive for reuse on the next parse.
tuple.second().close();
messageUnpackerHolder.set(new Tuple<>(new java.lang.ref.WeakReference<>(null), tuple.second()));
}
else if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(_streamReadFeatures)) {
// Stream is already closed above; release the reference so it doesn't
// linger on the thread until the next parse.
messageUnpackerHolder.set(new Tuple<>(new java.lang.ref.WeakReference<>(null), tuple.second()));
}
}There was a problem hiding this comment.
@gemini-code-assist Declining — strong references are deliberate. For byte[] sources and when AUTO_CLOSE_SOURCE is enabled, the source is cleared on close. For AUTO_CLOSE_SOURCE disabled, keeping the strong reference is intentional for same-stream reuse detection — clearing it would cause reset() to discard the unpacker's read-ahead buffer and corrupt sequential reads. See testInputStreamSequentialReadsWithAutoCloseSourceDisabled in MessagePackParserTest.
There was a problem hiding this comment.
Thank you for the clarification, komamitsu. I understand that maintaining strong references is intentional to support same-stream reuse detection and prevent corruption of read-ahead buffers when AUTO_CLOSE_SOURCE is disabled. Your explanation regarding the behavior in testInputStreamSequentialReadsWithAutoCloseSourceDisabled confirms that this is a necessary design choice for the library's requirements.
| val isJava17Plus: Boolean = { | ||
| val v = sys.props.getOrElse("java.specification.version", "1.8") | ||
| // getOrElse(false): non-numeric versions (e.g. early-access "17-ea") fail safe | ||
| // by not compiling the jackson3 module rather than making an optimistic guess. | ||
| if (v.startsWith("1.")) false else scala.util.Try(v.toInt >= 17).getOrElse(false) | ||
| } |
There was a problem hiding this comment.
The parsing of java.specification.version can fail to identify Java 17+ correctly on early-access releases (e.g., "17-ea"), as v.toInt will throw a NumberFormatException and default to false.
We can make this check more robust by extracting only the leading digits of the specification version.
val isJava17Plus: Boolean = {
val v = sys.props.getOrElse("java.specification.version", "1.8")
val majorStr = if (v.startsWith("1.")) v.drop(2) else v
val major = majorStr.takeWhile(_.isDigit)
scala.util.Try(major.toInt >= 17).getOrElse(false)
}
There was a problem hiding this comment.
@gemini-code-assist Declining — the current behavior is intentional and documented in the comment on that line: early-access versions like "17-ea" fail safe by skipping the module rather than making an optimistic guess about compatibility.
There was a problem hiding this comment.
I understand. Thank you for clarifying that the current behavior is intentional and documented to ensure a fail-safe mechanism for early-access Java versions. I will keep this in mind for future reviews.
| public String currentName() | ||
| { | ||
| // Simple, but need to look for START_OBJECT/ARRAY's "off-by-one" thing: | ||
| if (_currToken == JsonToken.START_OBJECT || _currToken == JsonToken.START_ARRAY) { | ||
| MessagePackReadContext parent = streamReadContext.getParent(); | ||
| return parent.currentName(); | ||
| } | ||
| return streamReadContext.currentName(); | ||
| } |
There was a problem hiding this comment.
Add a defensive null check for parent to prevent potential NullPointerExceptions if streamReadContext.getParent() returns null under unexpected state transitions or custom subclassing.
| public String currentName() | |
| { | |
| // Simple, but need to look for START_OBJECT/ARRAY's "off-by-one" thing: | |
| if (_currToken == JsonToken.START_OBJECT || _currToken == JsonToken.START_ARRAY) { | |
| MessagePackReadContext parent = streamReadContext.getParent(); | |
| return parent.currentName(); | |
| } | |
| return streamReadContext.currentName(); | |
| } | |
| @Override | |
| public String currentName() | |
| { | |
| // Simple, but need to look for START_OBJECT/ARRAY's "off-by-one" thing: | |
| if (_currToken == JsonToken.START_OBJECT || _currToken == JsonToken.START_ARRAY) { | |
| MessagePackReadContext parent = streamReadContext.getParent(); | |
| return parent == null ? null : parent.currentName(); | |
| } | |
| return streamReadContext.currentName(); | |
| } |
There was a problem hiding this comment.
@gemini-code-assist Declining — streamReadContext.getParent() cannot return null in normal usage here. When _currToken == START_OBJECT/ARRAY, _nextToken() has already advanced streamReadContext to the child context, so getParent() returns the (non-null) parent. The NPE path only exists under internal state corruption or unusual subclassing.
There was a problem hiding this comment.
Thank you for the clarification, komamitsu. I understand that streamReadContext.getParent() is guaranteed to be non-null in this specific code path due to the state machine's advancement before this method is called. I agree that adding a null check here would be redundant under normal operation.
| @Override | ||
| public void serialize(Instant value, JsonGenerator gen, SerializationContext provider) | ||
| { | ||
| try { | ||
| // Per-call allocation is a known limitation carried from the v2 module. | ||
| // Manually encoding the timestamp bytes would avoid it but duplicates | ||
| // msgpack-core's timestamp logic. Tracked as a future optimization. | ||
| ByteArrayOutputStream os = new ByteArrayOutputStream(); | ||
| try (MessagePacker packer = MessagePack.newDefaultPacker(os)) { | ||
| packer.packTimestamp(value); | ||
| } | ||
| try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(os.toByteArray())) { | ||
| ExtensionTypeHeader header = unpacker.unpackExtensionTypeHeader(); | ||
| byte[] bytes = unpacker.readPayload(header.getLength()); | ||
|
|
||
| MessagePackExtensionType extensionType = new MessagePackExtensionType(EXT_TYPE, bytes); | ||
| gen.writePOJO(extensionType); | ||
| } | ||
| } | ||
| catch (IOException e) { | ||
| throw _wrapIOFailure(provider, e); | ||
| } | ||
| } |
There was a problem hiding this comment.
The current implementation of InstantSerializer is highly inefficient as it performs multiple heavy allocations per serialization call (including ByteArrayOutputStream, MessagePacker, MessageUnpacker, and multiple intermediate byte arrays).
We can manually encode the timestamp bytes directly to completely eliminate these allocations, drastically improving performance and reducing GC pressure.
@Override
public void serialize(Instant value, JsonGenerator gen, SerializationContext provider)
{
try {
long sec = value.getEpochSecond();
int nano = value.getNano();
byte[] bytes;
if ((sec >>> 34) == 0) {
if (nano == 0 && (sec >>> 32) == 0) {
bytes = new byte[4];
bytes[0] = (byte) ((sec >> 24) & 0xFF);
bytes[1] = (byte) ((sec >> 16) & 0xFF);
bytes[2] = (byte) ((sec >> 8) & 0xFF);
bytes[3] = (byte) (sec & 0xFF);
}
else {
bytes = new byte[8];
long val = ((long) nano << 34) | sec;
bytes[0] = (byte) ((val >> 56) & 0xFF);
bytes[1] = (byte) ((val >> 48) & 0xFF);
bytes[2] = (byte) ((val >> 40) & 0xFF);
bytes[3] = (byte) ((val >> 32) & 0xFF);
bytes[4] = (byte) ((val >> 24) & 0xFF);
bytes[5] = (byte) ((val >> 16) & 0xFF);
bytes[6] = (byte) ((val >> 8) & 0xFF);
bytes[7] = (byte) (val & 0xFF);
}
}
else {
bytes = new byte[12];
bytes[0] = (byte) ((nano >> 24) & 0xFF);
bytes[1] = (byte) ((nano >> 16) & 0xFF);
bytes[2] = (byte) ((nano >> 8) & 0xFF);
bytes[3] = (byte) (nano & 0xFF);
bytes[4] = (byte) ((sec >> 56) & 0xFF);
bytes[5] = (byte) ((sec >> 48) & 0xFF);
bytes[6] = (byte) ((sec >> 40) & 0xFF);
bytes[7] = (byte) ((sec >> 32) & 0xFF);
bytes[8] = (byte) ((sec >> 24) & 0xFF);
bytes[9] = (byte) ((sec >> 16) & 0xFF);
bytes[10] = (byte) ((sec >> 8) & 0xFF);
bytes[11] = (byte) (sec & 0xFF);
}
MessagePackExtensionType extensionType = new MessagePackExtensionType(EXT_TYPE, bytes);
gen.writePOJO(extensionType);
}
catch (IOException e) {
throw _wrapIOFailure(provider, e);
}
}There was a problem hiding this comment.
@gemini-code-assist Per-call allocation is a known limitation already noted in a comment in the code. This is an optional extension module, not a hot path. Declining in favour of keeping the implementation simple.
There was a problem hiding this comment.
I understand your perspective. Since this is an optional extension module and not a performance-critical hot path, maintaining code simplicity and consistency with the existing implementation is a reasonable trade-off. I will mark this thread as resolved.
| @Override | ||
| public Instant deserialize(JsonParser p, DeserializationContext ctxt) | ||
| { | ||
| try { | ||
| // Per-call allocation is a known limitation — see serialize() above. | ||
| MessagePackExtensionType ext = p.readValueAs(MessagePackExtensionType.class); | ||
| if (ext.getType() != EXT_TYPE) { | ||
| ctxt.reportInputMismatch(Instant.class, | ||
| "Unexpected extension type (0x%X) for Instant object", ext.getType() & 0xFF); | ||
| return null; // unreachable | ||
| } | ||
| try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(ext.getData())) { | ||
| return unpacker.unpackTimestamp(new ExtensionTypeHeader(EXT_TYPE, ext.getData().length)); | ||
| } | ||
| } | ||
| catch (IOException e) { | ||
| throw _wrapIOFailure(ctxt, e); | ||
| } | ||
| } |
There was a problem hiding this comment.
Similarly to InstantSerializer, InstantDeserializer can be manually decoded to avoid allocating a MessageUnpacker and intermediate stream wrappers on every single deserialization call.
@Override
public Instant deserialize(JsonParser p, DeserializationContext ctxt)
{
try {
MessagePackExtensionType ext = p.readValueAs(MessagePackExtensionType.class);
if (ext.getType() != EXT_TYPE) {
ctxt.reportInputMismatch(Instant.class,
"Unexpected extension type (0x%X) for Instant object", ext.getType() & 0xFF);
return null; // unreachable
}
byte[] bytes = ext.getData();
long sec;
int nano;
if (bytes.length == 4) {
sec = ((long) (bytes[0] & 0xFF) << 24)
| ((long) (bytes[1] & 0xFF) << 16)
| ((long) (bytes[2] & 0xFF) << 8)
| (long) (bytes[3] & 0xFF);
nano = 0;
}
else if (bytes.length == 8) {
long val = ((long) (bytes[0] & 0xFF) << 56)
| ((long) (bytes[1] & 0xFF) << 48)
| ((long) (bytes[2] & 0xFF) << 40)
| ((long) (bytes[3] & 0xFF) << 32)
| ((long) (bytes[4] & 0xFF) << 24)
| ((long) (bytes[5] & 0xFF) << 16)
| ((long) (bytes[6] & 0xFF) << 8)
| (long) (bytes[7] & 0xFF);
nano = (int) (val >>> 34);
sec = val & 0x00000003FFFFFFFFL;
}
else if (bytes.length == 12) {
nano = ((bytes[0] & 0xFF) << 24)
| ((bytes[1] & 0xFF) << 16)
| ((bytes[2] & 0xFF) << 8)
| (bytes[3] & 0xFF);
sec = ((long) (bytes[4] & 0xFF) << 56)
| ((long) (bytes[5] & 0xFF) << 48)
| ((long) (bytes[6] & 0xFF) << 40)
| ((long) (bytes[7] & 0xFF) << 32)
| ((long) (bytes[8] & 0xFF) << 24)
| ((long) (bytes[9] & 0xFF) << 16)
| ((long) (bytes[10] & 0xFF) << 8)
| (long) (bytes[11] & 0xFF);
}
else {
throw new IllegalArgumentException("Unexpected timestamp length: " + bytes.length);
}
return Instant.ofEpochSecond(sec, nano);
}
catch (IOException e) {
throw _wrapIOFailure(ctxt, e);
}
}There was a problem hiding this comment.
@gemini-code-assist Per-call allocation is a known limitation already noted in a comment in the code. This is an optional extension module, not a hot path. Declining in favour of keeping the implementation simple.
There was a problem hiding this comment.
I understand your perspective. Since this is an optional extension module and not a hot path, keeping the implementation simple is a reasonable trade-off. I will mark this thread as resolved.
| this.reuseResourceInGenerator = b.reuseResourceInGenerator(); | ||
| this.reuseResourceInParser = b.reuseResourceInParser(); | ||
| this.supportIntegerKeys = b.supportIntegerKeys(); | ||
| this.extTypeCustomDesers = b.extTypeCustomDesers(); |
There was a problem hiding this comment.
@Copilot Acknowledged — the copy constructor correctly clones extTypeCustomDesers but the builder constructor stores it by reference. Post-build mutation of the builder's registry would affect the already-built factory. This is a known inconsistency; we accept it as a low-priority issue since the pattern is unusual in practice.
| else { | ||
| nodes.add(new NodeArray(currentParentElementIndex)); | ||
| } |
There was a problem hiding this comment.
@Copilot Fixed — writeStartArray()/writeStartObject() now call flush() before adding a new root-level node when isElementsClosed is true, mirroring the same guard already in place for root scalars. testFlushMidWriteOnSecondRootContainerDoesNotCorruptState in MessagePackGeneratorTest covers this.
| else { | ||
| nodes.add(new NodeObject(currentParentElementIndex)); | ||
| } |
There was a problem hiding this comment.
@Copilot Fixed — writeStartArray()/writeStartObject() now call flush() before adding a new root-level node when isElementsClosed is true, mirroring the same guard already in place for root scalars. testFlushMidWriteOnSecondRootContainerDoesNotCorruptState in MessagePackGeneratorTest covers this.
After the first root container closes, isElementsClosed=true. Opening a second root array/object did not reset this flag, so a flush() call while the second container was still open would pack the incomplete node tree and clear nodes[], causing IndexOutOfBoundsException on subsequent writes. Fix: flush the previous closed root before adding a new root-level node in writeStartArray/writeStartObject, mirroring the same guard already applied in addValueNode() for root scalars. Add testFlushMidWriteOnSecondRootContainerDoesNotCorruptState to cover both array and object variants.
|
/gemini review |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces the msgpack-jackson3 module, porting the MessagePack data format extension to Jackson 3.x with Java 17+ support, along with JMH benchmarks and build configuration updates. The review feedback highlights critical memory leak and thread-safety issues in the ThreadLocal resource reuse mechanism. Specifically, it is recommended to wrap the parser's source object in a WeakReference to prevent stream leaks, and to store the generator's OutputStreamBufferOutput in an instance field to ensure safe cleanup during _releaseBuffers() even when closed from a different thread.
| { | ||
| // Retained heap per idle thread: ~0.2 KB (MessageUnpacker with cleared input buffer). | ||
| // Negligible compared to Jackson's own per-thread buffer retention. | ||
| private static final ThreadLocal<Tuple<Object, MessageUnpacker>> messageUnpackerHolder = new ThreadLocal<>(); |
There was a problem hiding this comment.
Using a strong reference to the source object (such as an InputStream) in a ThreadLocal can lead to severe memory leaks in long-running pooled-thread environments (like web/application servers) if the stream is not closed or if AUTO_CLOSE_SOURCE is disabled. Wrapping the source in a WeakReference prevents these leaks while still allowing same-stream reuse detection as long as the caller holds a reference to the stream.
| private static final ThreadLocal<Tuple<Object, MessageUnpacker>> messageUnpackerHolder = new ThreadLocal<>(); | |
| private static final ThreadLocal<Tuple<java.lang.ref.WeakReference<Object>, MessageUnpacker>> messageUnpackerHolder = new ThreadLocal<>(); |
| Tuple<Object, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get(); | ||
| if (messageUnpackerTuple == null) { | ||
| messageUnpacker = MessagePack.newDefaultUnpacker(input); | ||
| } | ||
| else { | ||
| // Considering to reuse InputStream with StreamReadFeature.AUTO_CLOSE_SOURCE, | ||
| // MessagePackParser needs to use the MessageUnpacker that has the same InputStream | ||
| // since it has buffer which has loaded the InputStream data ahead. | ||
| // However, it needs to call MessageUnpacker#reset when the source is different from the previous one. | ||
| Object cachedSrc = messageUnpackerTuple.first(); | ||
| if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(streamReadFeatures) || cachedSrc != src || src instanceof byte[]) { | ||
| // reset() replaces the internal MessageBufferInput and clears the unpacker's | ||
| // internal read buffer to EMPTY_BUFFER. The old ArrayBufferInput becomes | ||
| // unreachable here (we discard the return value), so its byte[] is GC-eligible. | ||
| messageUnpackerTuple.second().reset(input); | ||
| } | ||
| messageUnpacker = messageUnpackerTuple.second(); | ||
| } | ||
| messageUnpackerHolder.set(new Tuple<>(src, messageUnpacker)); | ||
| ownsThreadLocalUnpacker = true; |
There was a problem hiding this comment.
Update the constructor to retrieve and store the source object wrapped in a WeakReference to match the updated ThreadLocal type and prevent memory leaks.
Tuple<java.lang.ref.WeakReference<Object>, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get();\n if (messageUnpackerTuple == null) {\n messageUnpacker = MessagePack.newDefaultUnpacker(input);\n }\n else {\n // Considering to reuse InputStream with StreamReadFeature.AUTO_CLOSE_SOURCE,\n // MessagePackParser needs to use the MessageUnpacker that has the same InputStream\n // since it has buffer which has loaded the InputStream data ahead.\n // However, it needs to call MessageUnpacker#reset when the source is different from the previous one.\n Object cachedSrc = messageUnpackerTuple.first().get();\n if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(streamReadFeatures) || cachedSrc != src || src instanceof byte[]) {\n // reset() replaces the internal MessageBufferInput and clears the unpacker's\n // internal read buffer to EMPTY_BUFFER. The old ArrayBufferInput becomes\n // unreachable here (we discard the return value), so its byte[] is GC-eligible.\n messageUnpackerTuple.second().reset(input);\n }\n messageUnpacker = messageUnpackerTuple.second();\n }\n messageUnpackerHolder.set(new Tuple<>(new java.lang.ref.WeakReference<>(src), messageUnpacker));\n ownsThreadLocalUnpacker = true;| if (ownsThreadLocalUnpacker) { | ||
| Tuple<Object, MessageUnpacker> tuple = messageUnpackerHolder.get(); | ||
| if (tuple != null) { | ||
| if (tuple.first() instanceof byte[]) { | ||
| // close() calls ArrayBufferInput.close() which sets buffer = null, | ||
| // releasing the byte[] payload reference held by the unpacker's input. | ||
| // The unpacker itself is kept alive for reuse on the next parse. | ||
| tuple.second().close(); | ||
| messageUnpackerHolder.set(new Tuple<>(null, tuple.second())); | ||
| } | ||
| else if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(_streamReadFeatures)) { | ||
| // Stream is already closed above; release the reference so it doesn't | ||
| // linger on the thread until the next parse. | ||
| messageUnpackerHolder.set(new Tuple<>(null, tuple.second())); | ||
| } | ||
| // else: InputStream with AUTO_CLOSE_SOURCE disabled — keep the reference | ||
| // so the next parse on the same thread can detect same-stream reuse and | ||
| // avoid resetting the unpacker (which would discard its read-ahead buffer). | ||
| } | ||
| } |
There was a problem hiding this comment.
Update _closeInput() to correctly extract the referent from the WeakReference before performing the instanceof byte[] check and clearing the reference.
if (ownsThreadLocalUnpacker) {\n Tuple<java.lang.ref.WeakReference<Object>, MessageUnpacker> tuple = messageUnpackerHolder.get();\n if (tuple != null) {\n Object referent = tuple.first().get();\n if (referent instanceof byte[]) {\n // close() calls ArrayBufferInput.close() which sets buffer = null,\n // releasing the byte[] payload reference held by the unpacker's input.\n // The unpacker itself is kept alive for reuse on the next parse.\n tuple.second().close();\n messageUnpackerHolder.set(new Tuple<>(new java.lang.ref.WeakReference<>(null), tuple.second()));\n }\n else if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(_streamReadFeatures)) {\n // Stream is already closed above; release the reference so it doesn't\n // linger on the thread until the next parse.\n messageUnpackerHolder.set(new Tuple<>(new java.lang.ref.WeakReference<>(null), tuple.second()));\n }\n // else: InputStream with AUTO_CLOSE_SOURCE disabled — keep the reference\n // so the next parse on the same thread can detect same-stream reuse and\n // avoid resetting the unpacker (which would discard its read-ahead buffer).\n }\n }| private MessagePackWriteContext writeContext; | ||
| private final boolean ownsThreadLocalBuffer; |
There was a problem hiding this comment.
Add an instance field to store the OutputStreamBufferOutput reference. This allows the generator to safely reset the buffer on close even if the close operation is executed on a different thread than the one that created the generator.
private MessagePackWriteContext writeContext;\n private final boolean ownsThreadLocalBuffer;\n private OutputStreamBufferOutput messageBufferOutput;| public MessagePackGenerator( | ||
| ObjectWriteContext writeCtxt, | ||
| IOContext ioCtxt, | ||
| int streamWriteFeatures, | ||
| OutputStream out, | ||
| MessagePack.PackerConfig packerConfig, | ||
| boolean reuseResourceInGenerator, | ||
| boolean supportIntegerKeys) | ||
| throws IOException | ||
| { | ||
| super(writeCtxt, ioCtxt, streamWriteFeatures); | ||
| this.output = out; | ||
| this.messagePacker = packerConfig.newPacker(getMessageBufferOutputForOutputStream(out, reuseResourceInGenerator)); | ||
| this.packerConfig = packerConfig; | ||
| this.nodes = new ArrayList<>(); | ||
| this.supportIntegerKeys = supportIntegerKeys; | ||
| this.writeContext = MessagePackWriteContext.createRootContext( | ||
| StreamWriteFeature.STRICT_DUPLICATE_DETECTION.enabledIn(streamWriteFeatures) | ||
| ? DupDetector.rootDetector(this) : null); | ||
| this.ownsThreadLocalBuffer = reuseResourceInGenerator; | ||
| } |
There was a problem hiding this comment.
Initialize the newly added messageBufferOutput instance field in the constructor.
public MessagePackGenerator(\n ObjectWriteContext writeCtxt,\n IOContext ioCtxt,\n int streamWriteFeatures,\n OutputStream out,\n MessagePack.PackerConfig packerConfig,\n boolean reuseResourceInGenerator,\n boolean supportIntegerKeys)\n throws IOException\n {\n super(writeCtxt, ioCtxt, streamWriteFeatures);\n this.output = out;\n this.messageBufferOutput = getMessageBufferOutputForOutputStream(out, reuseResourceInGenerator);\n this.messagePacker = packerConfig.newPacker(this.messageBufferOutput);\n this.packerConfig = packerConfig;\n this.nodes = new ArrayList<>();\n this.supportIntegerKeys = supportIntegerKeys;\n this.writeContext = MessagePackWriteContext.createRootContext(\n StreamWriteFeature.STRICT_DUPLICATE_DETECTION.enabledIn(streamWriteFeatures)\n ? DupDetector.rootDetector(this) : null);\n this.ownsThreadLocalBuffer = reuseResourceInGenerator;\n }| protected void _releaseBuffers() | ||
| { | ||
| // No null check on get(): generators are single-threaded by contract so this | ||
| // ThreadLocal is always set on the calling thread. A null here would indicate | ||
| // cross-thread misuse; letting it NPE surfaces that bug immediately. | ||
| if (ownsThreadLocalBuffer) { | ||
| OutputStreamBufferOutput buf = messageBufferOutputHolder.get(); | ||
| if (buf != null) { | ||
| try { | ||
| buf.reset(null); | ||
| } | ||
| catch (IOException e) { | ||
| throw _wrapIOFailure(e); | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Update _releaseBuffers() to reset the messageBufferOutput instance field directly. This avoids potential memory leaks of the OutputStream if the generator is closed on a different thread, as it bypasses the ThreadLocal lookup which would return null on other threads.
@Override\n protected void _releaseBuffers()\n {\n if (ownsThreadLocalBuffer && messageBufferOutput != null) {\n try {\n messageBufferOutput.reset(null);\n }\n catch (IOException e) {\n throw _wrapIOFailure(e);\n }\n }\n }There was a problem hiding this comment.
Code Review
This pull request introduces the msgpack-jackson3 module, which adds Jackson 3.x support for MessagePack serialization and deserialization, including custom extension types and a timestamp extension module. The review feedback identifies several critical issues: potential memory leaks in MessagePackParser due to strong references to source objects held in a ThreadLocal when AUTO_CLOSE_SOURCE is disabled, a logic bug in MessagePackGenerator.writeNumber causing NaN/Infinity parsing to incorrectly throw a NumberFormatException, and multiple potential NullPointerExceptions when handling null values from custom extension deserializers or read operations.
| { | ||
| // Retained heap per idle thread: ~0.2 KB (MessageUnpacker with cleared input buffer). | ||
| // Negligible compared to Jackson's own per-thread buffer retention. | ||
| private static final ThreadLocal<Tuple<Object, MessageUnpacker>> messageUnpackerHolder = new ThreadLocal<>(); |
There was a problem hiding this comment.
The messageUnpackerHolder ThreadLocal stores a Tuple containing a strong reference to the source object (src, which can be an InputStream). When AUTO_CLOSE_SOURCE is disabled, this reference is never cleared or released, leading to a memory leak of the InputStream (and any associated buffers/resources) on the thread. Using a WeakReference for the source object in the Tuple prevents this memory leak while still allowing same-stream reuse detection.
| private static final ThreadLocal<Tuple<Object, MessageUnpacker>> messageUnpackerHolder = new ThreadLocal<>(); | |
| private static final ThreadLocal<Tuple<java.lang.ref.WeakReference<Object>, MessageUnpacker>> messageUnpackerHolder = new ThreadLocal<>(); |
| Tuple<Object, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get(); | ||
| if (messageUnpackerTuple == null) { | ||
| messageUnpacker = MessagePack.newDefaultUnpacker(input); | ||
| } | ||
| else { | ||
| // Considering to reuse InputStream with StreamReadFeature.AUTO_CLOSE_SOURCE, | ||
| // MessagePackParser needs to use the MessageUnpacker that has the same InputStream | ||
| // since it has buffer which has loaded the InputStream data ahead. | ||
| // However, it needs to call MessageUnpacker#reset when the source is different from the previous one. | ||
| Object cachedSrc = messageUnpackerTuple.first(); | ||
| if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(streamReadFeatures) || cachedSrc != src || src instanceof byte[]) { | ||
| // reset() replaces the internal MessageBufferInput and clears the unpacker's | ||
| // internal read buffer to EMPTY_BUFFER. The old ArrayBufferInput becomes | ||
| // unreachable here (we discard the return value), so its byte[] is GC-eligible. | ||
| messageUnpackerTuple.second().reset(input); | ||
| } | ||
| messageUnpacker = messageUnpackerTuple.second(); | ||
| } | ||
| messageUnpackerHolder.set(new Tuple<>(src, messageUnpacker)); | ||
| ownsThreadLocalUnpacker = true; |
There was a problem hiding this comment.
Update the constructor to use WeakReference for the cached source object to prevent memory leaks when AUTO_CLOSE_SOURCE is disabled.
| Tuple<Object, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get(); | |
| if (messageUnpackerTuple == null) { | |
| messageUnpacker = MessagePack.newDefaultUnpacker(input); | |
| } | |
| else { | |
| // Considering to reuse InputStream with StreamReadFeature.AUTO_CLOSE_SOURCE, | |
| // MessagePackParser needs to use the MessageUnpacker that has the same InputStream | |
| // since it has buffer which has loaded the InputStream data ahead. | |
| // However, it needs to call MessageUnpacker#reset when the source is different from the previous one. | |
| Object cachedSrc = messageUnpackerTuple.first(); | |
| if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(streamReadFeatures) || cachedSrc != src || src instanceof byte[]) { | |
| // reset() replaces the internal MessageBufferInput and clears the unpacker's | |
| // internal read buffer to EMPTY_BUFFER. The old ArrayBufferInput becomes | |
| // unreachable here (we discard the return value), so its byte[] is GC-eligible. | |
| messageUnpackerTuple.second().reset(input); | |
| } | |
| messageUnpacker = messageUnpackerTuple.second(); | |
| } | |
| messageUnpackerHolder.set(new Tuple<>(src, messageUnpacker)); | |
| ownsThreadLocalUnpacker = true; | |
| Tuple<java.lang.ref.WeakReference<Object>, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get(); | |
| if (messageUnpackerTuple == null) { | |
| messageUnpacker = MessagePack.newDefaultUnpacker(input); | |
| } | |
| else { | |
| // Considering to reuse InputStream with StreamReadFeature.AUTO_CLOSE_SOURCE, | |
| // MessagePackParser needs to use the MessageUnpacker that has the same InputStream | |
| // since it has buffer which has loaded the InputStream data ahead. | |
| // However, it needs to call MessageUnpacker#reset when the source is different from the previous one. | |
| Object cachedSrc = messageUnpackerTuple.first().get(); | |
| if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(streamReadFeatures) || cachedSrc != src || src instanceof byte[]) { | |
| // reset() replaces the internal MessageBufferInput and clears the unpacker's | |
| // internal read buffer to EMPTY_BUFFER. The old ArrayBufferInput becomes | |
| // unreachable here (we discard the return value), so its byte[] is GC-eligible. | |
| messageUnpackerTuple.second().reset(input); | |
| } | |
| messageUnpacker = messageUnpackerTuple.second(); | |
| } | |
| messageUnpackerHolder.set(new Tuple<>(new java.lang.ref.WeakReference<>(src), messageUnpacker)); | |
| ownsThreadLocalUnpacker = true; |
| if (ownsThreadLocalUnpacker) { | ||
| Tuple<Object, MessageUnpacker> tuple = messageUnpackerHolder.get(); | ||
| if (tuple != null) { | ||
| if (tuple.first() instanceof byte[]) { | ||
| // close() calls ArrayBufferInput.close() which sets buffer = null, | ||
| // releasing the byte[] payload reference held by the unpacker's input. | ||
| // The unpacker itself is kept alive for reuse on the next parse. | ||
| tuple.second().close(); | ||
| messageUnpackerHolder.set(new Tuple<>(null, tuple.second())); | ||
| } | ||
| else if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(_streamReadFeatures)) { | ||
| // Stream is already closed above; release the reference so it doesn't | ||
| // linger on the thread until the next parse. | ||
| messageUnpackerHolder.set(new Tuple<>(null, tuple.second())); | ||
| } | ||
| // else: InputStream with AUTO_CLOSE_SOURCE disabled — keep the reference | ||
| // so the next parse on the same thread can detect same-stream reuse and | ||
| // avoid resetting the unpacker (which would discard its read-ahead buffer). | ||
| } |
There was a problem hiding this comment.
Update _closeInput() to use WeakReference for the cached source object to prevent memory leaks when AUTO_CLOSE_SOURCE is disabled.
if (ownsThreadLocalUnpacker) {
Tuple<java.lang.ref.WeakReference<Object>, MessageUnpacker> tuple = messageUnpackerHolder.get();
if (tuple != null) {
Object cachedSrc = tuple.first().get();
if (cachedSrc instanceof byte[]) {
// close() calls ArrayBufferInput.close() which sets buffer = null,
// releasing the byte[] payload reference held by the unpacker's input.
// The unpacker itself is kept alive for reuse on the next parse.
tuple.second().close();
messageUnpackerHolder.set(new Tuple<>(new java.lang.ref.WeakReference<>(null), tuple.second()));
}
else if (StreamReadFeature.AUTO_CLOSE_SOURCE.enabledIn(_streamReadFeatures)) {
// Stream is already closed above; release the reference so it doesn't
// linger on the thread until the next parse.
messageUnpackerHolder.set(new Tuple<>(new java.lang.ref.WeakReference<>(null), tuple.second()));
}
// else: InputStream with AUTO_CLOSE_SOURCE disabled — keep the reference
// so the next parse on the same thread can detect same-stream reuse and
// avoid resetting the unpacker (which would discard its read-ahead buffer).
}
}| catch (NumberFormatException e) { | ||
| // Fall back for NaN, Infinity, -Infinity which BigDecimal rejects. | ||
| try { | ||
| double d = Double.parseDouble(encodedValue); | ||
| addValueNode(d); | ||
| } | ||
| catch (NumberFormatException ignored) { | ||
| } | ||
| } |
There was a problem hiding this comment.
In writeNumber(String encodedValue), if BigDecimal parsing fails (which happens for \"NaN\", \"Infinity\", and \"-Infinity\"), the code falls back to Double.parseDouble(encodedValue). If this succeeds, it calls addValueNode(d) but fails to return this. As a result, it falls through to throw new NumberFormatException(encodedValue);, meaning parsing NaN/Infinity will always throw a NumberFormatException even though it was successfully parsed and added.
| catch (NumberFormatException e) { | |
| // Fall back for NaN, Infinity, -Infinity which BigDecimal rejects. | |
| try { | |
| double d = Double.parseDouble(encodedValue); | |
| addValueNode(d); | |
| } | |
| catch (NumberFormatException ignored) { | |
| } | |
| } | |
| catch (NumberFormatException e) { | |
| // Fall back for NaN, Infinity, -Infinity which BigDecimal rejects. | |
| try { | |
| double d = Double.parseDouble(encodedValue); | |
| addValueNode(d); | |
| return this; | |
| } | |
| catch (NumberFormatException ignored) { | |
| } | |
| } |
| if (isObjectValueSet) { | ||
| streamReadContext.setCurrentName(deserializedExtensionTypeValue().toString()); | ||
| nextToken = JsonToken.PROPERTY_NAME; | ||
| } |
There was a problem hiding this comment.
If a custom extension deserializer returns null, calling .toString() on it directly will throw a NullPointerException. Adding a null check prevents this.
if (isObjectValueSet) {
Object deserialized = deserializedExtensionTypeValue();
streamReadContext.setCurrentName(deserialized == null ? null : deserialized.toString());
nextToken = JsonToken.PROPERTY_NAME;
}| case EXT: | ||
| try { | ||
| return deserializedExtensionTypeValue().toString(); | ||
| } | ||
| catch (IOException e) { | ||
| throw _wrapIOFailure(e); | ||
| } |
There was a problem hiding this comment.
If a custom extension deserializer returns null, calling .toString() on it directly will throw a NullPointerException. Adding a null check prevents this.
| case EXT: | |
| try { | |
| return deserializedExtensionTypeValue().toString(); | |
| } | |
| catch (IOException e) { | |
| throw _wrapIOFailure(e); | |
| } | |
| case EXT: | |
| try { | |
| Object deserialized = deserializedExtensionTypeValue(); | |
| return deserialized == null ? null : deserialized.toString(); | |
| } | |
| catch (IOException e) { | |
| throw _wrapIOFailure(e); | |
| } |
| return null; // unreachable | ||
| } | ||
| try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(ext.getData())) { |
There was a problem hiding this comment.
If the input contains a null value or is otherwise empty, p.readValueAs(MessagePackExtensionType.class) can return null. Calling ext.getType() directly on a potentially null ext will throw a NullPointerException. Adding a null check prevents this.
MessagePackExtensionType ext = p.readValueAs(MessagePackExtensionType.class);
if (ext == null) {
return null;
}
if (ext.getType() != EXT_TYPE) {| ObjectMapper objectMapper = new MessagePackMapper(); | ||
| objectMapper.disable(StreamWriteFeature.AUTO_CLOSE_TARGET); |
| ObjectMapper objectMapper = new MessagePackMapper(); | ||
| objectMapper.disable(StreamReadFeature.AUTO_CLOSE_SOURCE); |
| ObjectMapper objectMapper = new MessagePackMapper() | ||
| .registerModule(TimestampExtensionModule.INSTANCE); |
| ObjectMapper objectMapper = new ObjectMapper( | ||
| new MessagePackFactory().setExtTypeCustomDesers(extTypeCustomDesers)) | ||
| .registerModule(module); |
| ObjectMapper objectMapper = new ObjectMapper( | ||
| new MessagePackFactory().setExtTypeCustomDesers(extTypeCustomDesers)) | ||
| .registerModule(module); |
| } | ||
| } | ||
|
|
||
| private JsonToken _nextToken() throws IOException |
Jackson 3 ObjectMapper is configured via the builder, not via mutable methods. Replace objectMapper.disable() and objectMapper.registerModule() calls with the corresponding builder equivalents: - disable() → MapperBuilder.disable() - registerModule() → MapperBuilder.addModule()
|
Closing to reopen as a single squashed commit on the upstream repo. |
Summary
msgpack-jackson3submodule (jackson-dataformat-msgpack3) that ports the existing Jackson 2.x integration to Jackson 3.x (tools.jacksonnamespace)src/jmh/java) runnable via./sbt "msgpack-jackson3/jmh:run"Key differences from
msgpack-jacksoncom.fasterxml.jackson→tools.jacksonTokenStreamLocationreplacesJsonLocation; byte offset passed ascolumnNrMessagePackWriteContextreplacesSimpleStreamWriteContextMessagePackFactory.snapshot()delegates tocopy();rebuild()implemented viaMessagePackFactoryBuilderMessagePackSerializedString: allappend*,put*,write*methods fully implementedPROPERTY_NAMEinstead ofVALUE_NULLTest plan
./sbt "msgpack-jackson3/test"— 112 tests pass./sbt "msgpack-jackson3/jmh:compile"— JMH benchmarks compile cleanlymsgpack-coreandmsgpack-jacksontests unaffected